Cloud Functions から Cloud Scheduler ジョブを作成/更新して、1つ目の関数実行から一定時間経過後に次の関数を実行してみた。
こんにちは、みかみです。
10年にいちどの寒波とも言われている今週は、さすがに沖縄も寒かったです。。(これが終われば春が来るはず!?
やりたいこと
- 1つ目の Cloud Functions 実行から一定時間経過後に、2つ目の Cloud Functions を実行したい
- 1つ目の Cloud Functions で Cloud Scheduler ジョブを作成/更新したい
イベント駆動で実行したいけど、次の処理実行までにある程度時間を空けたい。
数分の待機時間であれば、簡単に思いつくのは元の処理で sleep
して一定時間待機する方法ですが、Cloud Functions を使う場合実行時間制限があるし、せっかくのサーバレスフレームワーク、sleep でリソース使い続けるのももったいない。。
イベント駆動で実行する1つ目の Cloud Functions で Cloud Scheduler ジョブを作成/更新して、2つ目の Cloud Functions を実行してみます。 1つ目の Cloud Functions は前回のブログ で使用した、BigQuery へのデータ INSERT をトリガに実行する関数です。 BigQuery へのデータ INSERT が行われてから一定時間後に、次の Cloud Functions を実行します。
前提
Google Cloud SDK(gcloud
コマンド)の実行環境は準備済みであるものとします。
本エントリでは、Cloud Shell を使用しました。
以下のコマンドで、Cloud Scheduler の Python ライブラリをインストール済みです。
pip install google-cloud-scheduler
Eventarc API や、本エントリで利用している BigQuery, Cloud Functions, Cloud Build などの API は有効化済みで、操作するアカウントに各設定に必要な権限は付与済みです。 動作確認では、両方のプロジェクトでプロジェクトオーナーロールを付与したアカウントを使用しています。。
また、Cloud Functions デフォルトサービスアカウント([PROJECT_NUMBER][email protected]
)に、Eventarc、Pub/Sub および Cloud Scheduler の操作権限を付与済みです。
Pub/Sub トピックを作成
Cloud Scheduler がメッセージを Publish する Pub/Sub トピックを作成します。
Cloud Shell から以下のコマンドを実行しました。
gcloud pubsub topics create sample_scheduler_topic
1つ目の Cloud Functions に Cloud Scheduler ジョブ作成/更新処理を追加
前回 デプロイした、BigQuery のデータ INSERT で実行される Cloud Functions に、Cloud Scheduler ジョブ作成/更新処理を追加しました。
import base64 import json import functions_framework from google.cloud import scheduler_v1 from google.protobuf import field_mask_pb2 import datetime from pytz import timezone @functions_framework.cloud_event def fetch_insert_log_sync(cloud_event): message = json.loads(base64.b64decode(cloud_event.data["message"]["data"])) print(message) protoPayload = message.get('protoPayload', None) metadata = protoPayload.get('metadata', None) tableDataChange = metadata.get('tableDataChange', None) if tableDataChange is None: print('no tableDataChange...') return insertedRowsCount = tableDataChange.get('insertedRowsCount', None) if insertedRowsCount is None: print('no INSERT data...') resourceName = protoPayload.get('resourceName', None) project = resourceName.split('/')[1] dataset = resourceName.split('/')[3] table = resourceName.split('/')[5] print(f'**** insert data: {project}.{dataset}.{table}({insertedRowsCount} rows) ****') job_name = f'projects/cm-da-mikami-yuki-258308/locations/asia-northeast1/jobs/{dataset}_{table}' now = datetime.datetime.now(timezone('Asia/Tokyo')) schedule = now + datetime.timedelta(minutes=5) hour = schedule.hour minute = schedule.minute schedule = f'{str(minute).zfill(2)} {str(hour).zfill(2)} * * *' client = scheduler_v1.CloudSchedulerClient() try: request = scheduler_v1.GetJobRequest( name=job_name ) job = client.get_job(request=request) job.schedule = schedule param = f'{{"params" : {{"dataset": "{dataset}", "table": "{table}", "status": "updated"}}}}' job.pubsub_target = scheduler_v1.types.PubsubTarget( topic_name='projects/cm-da-mikami-yuki-258308/topics/sample_scheduler_topic', data=bytes(param, 'utf-8') ) update_mask = field_mask_pb2.FieldMask(paths=['schedule', 'pubsub_target']) request = scheduler_v1.UpdateJobRequest( job=job, update_mask=update_mask ) response = client.update_job(request=request) print(response) except: print('No Job.') job = scheduler_v1.Job() job.name = job_name job.description = 'test_schedule' param = f'{{"params" : {{"dataset": "{dataset}", "table": "{table}", "status": "create"}}}}' job.pubsub_target = scheduler_v1.types.PubsubTarget( topic_name='projects/cm-da-mikami-yuki-258308/topics/sample_scheduler_topic', data=bytes(param, 'utf-8') ) job.schedule = schedule job.time_zone = 'Asia/Tokyo' request = scheduler_v1.CreateJobRequest( parent='projects/cm-da-mikami-yuki-258308/locations/asia-northeast1', job=job ) response = client.create_job(request=request) print(response)
データ INSERT イベントで通知された [データセット名]_[テーブル名]
という名前の Cloud Scheduler ジョブが既にあるかどうか確認し
job_name = f'projects/cm-da-mikami-yuki-258308/locations/asia-northeast1/jobs/{dataset}_{table}}' (省略) client = scheduler_v1.CloudSchedulerClient() try: request = scheduler_v1.GetJobRequest( name=job_name ) job = client.get_job(request=request) (省略) except: (省略)
同名の Scheduler ジョブが既にある場合は、実行スケジュールを今から5分後に、Scheduler 実行パラメータの status
値を updated
に更新します。
(省略) now = datetime.datetime.now(timezone('Asia/Tokyo')) schedule = now + datetime.timedelta(minutes=5) hour = schedule.hour minute = schedule.minute schedule = f'{str(minute).zfill(2)} {str(hour).zfill(2)} * * *' client = scheduler_v1.CloudSchedulerClient() try: (省略) job.schedule = schedule param = f'{{"params" : {{"dataset": "{dataset}", "table": "{table}", "status": "updated"}}}}' job.pubsub_target = scheduler_v1.types.PubsubTarget( topic_name='projects/cm-da-mikami-yuki-258308/topics/sample_scheduler_topic', data=bytes(param, 'utf-8') ) update_mask = field_mask_pb2.FieldMask(paths=['schedule', 'pubsub_target']) request = scheduler_v1.UpdateJobRequest( job=job, update_mask=update_mask ) response = client.update_job(request=request) print(response) (省略)
同名の Scheduler ジョブが無い場合には、今から5分後に Scheduler パラメータ status
が created
で実行するジョブを新規作成します。
(省略) now = datetime.datetime.now(timezone('Asia/Tokyo')) schedule = now + datetime.timedelta(minutes=5) hour = schedule.hour minute = schedule.minute schedule = f'{str(minute).zfill(2)} {str(hour).zfill(2)} * * *' client = scheduler_v1.CloudSchedulerClient() try: (省略) except: print('No Job.') job = scheduler_v1.Job() job.name = job_name job.description = 'test_schedule' param = f'{{"params" : {{"dataset": "{dataset}", "table": "{table}", "status": "created"}}}}' job.pubsub_target = scheduler_v1.types.PubsubTarget( topic_name='projects/cm-da-mikami-yuki-258308/topics/sample_scheduler_topic', data=bytes(param, 'utf-8') ) job.schedule = schedule job.time_zone = 'Asia/Tokyo' request = scheduler_v1.CreateJobRequest( parent='projects/cm-da-mikami-yuki-258308/locations/asia-northeast1', job=job ) response = client.create_job(request=request) print(response)
合わせて、以下の requirements.txt
も追加します。
google-cloud-scheduler>=2.9.0 pytz>=2022.1
以下のコマンドで修正コードをデプロイしました。
gcloud functions deploy fetch-insert-log-sync \ --gen2 \ --region asia-northeast1 \ --runtime python310 \ --entry-point=fetch_insert_log_sync \ --trigger-topic="audit_log_sync" \ --trigger-location=asia-northeast1
2つ目の Cloud Functions を追加
1つ目の Cloud Functions が作成/更新した Cloud Scheduler ジョブから実行される、2つ目の Cloud Functions をデプロイします。
処理内容は、Scheduler から渡されたパラメータの内容をログ出力するだの簡易なものです。
以下の python コードを、main.py
というファイル名で保存しました。
import base64 import json import functions_framework @functions_framework.cloud_event def print_pubsub_message(cloud_event): message = json.loads(base64.b64decode(cloud_event.data["message"]["data"])) param = message.get("params") print(param.get("dataset")) print(param.get("table")) print(param.get("status"))
以下のコマンドでデプロイしました。
gcloud functions deploy sample-scheduler-function \ --gen2 \ --region asia-northeast1 \ --runtime python310 \ --entry-point=print_pubsub_message \ --trigger-topic="sample_scheduler_topic" \ --trigger-location=asia-northeast1
動作確認
Scheduler ジョブ新規作成
前回 同様、BigQuery INSERT イベントを発行して、1つ目の Cloud Functions を実行しました。
ログを確認すると
正常に Cloud Scheduler ジョブ作成できたようです。
管理コンソールからも、Cloud Scheduler ジョブが作成されていることが確認できました。
Scheduler で指定した時間になるのを待って、2つ目の Cloud Functions が実行されたか確認してみます。
期待通り、1つ目の Cloud Functions で作成した Cloud Scheduler ジョブで指定した時間に、2つ目の Cloud Functions が実行されたことが確認できました。
Scheduler ジョブ更新
続いて、既に同名の Cloud Scheduler ジョブがある状態で、実行時間とパラメータが更新できるか確認してみます。
先ほど同様、BigQuery INSERT イベントを発行して、1つ目の Cloud Functions を実行しました。
ログから Cloud Scheduler ジョブが正常に更新されたことが確認できました。
管理コンソールからも、Scheduler ジョブの実行時間が更新されたことが確認できます。
更新した Scheduler 実行時間まで待って、2つ目の Cloud Functions のログも確認してみます。
期待通り、更新時間に、更新パラメータで実行されたことが確認できました。
まとめ(所感)
これまで、Cloud Scheduler ジョブはあらかじめ作成しておいて、Cloud Functions や Cloud Workflows のスケジュール実行トリガに使用していましたが、Scheduler ジョブをプログラムで作成/更新できれば、より実装の幅が広がるのではないかと思います。
例えば「前回の処理の正常終了から一定期間後」に次の処理を実行したい場合、スケジュール実行にしてしまうと、厳密には前回の処理実行時間分次の処理開始時間が早まってしまいますし、前回の処理が異常終了した場合でも次の処理が動いてしまいます。
イベント駆動とスケジュール実行を組み合わせることにより、より柔軟に対応できてうれしいと思いました。
参考
- Python Client for Cloud Scheduler API | Google Cloud リファレンス
- Class CloudSchedulerClient (2.9.1) | Google Cloud リファレンス
- Class Job (2.9.1) | Google Cloud リファレンス
- Class PubsubTarget (2.9.1) | Google Cloud リファレンス
- Pub/Sub を使用して Cloud ファンクションをトリガーする | Cloud Scheduler ドキュメント
- googleapis/python-scheduler | GitHub
- How to create a job with Google Cloud scheduler Python api | stack overflow
- How to update GCP Scheduler Jobs with Python | stack overflow